使用Kyuubi JDBC访问Lakesoul表
提示
从 2.4 版本起提供。
LakeSoul实现了Flink/Spark Connector。我们可以通过Kyuubi使用Spark/Flink SQL来查询访问Lakesoul.
环境要求
组件 | 版本 |
---|---|
Kyuubi | 1.8 |
Spark | 3.3 |
Flink | 1.20 |
LakeSoul | 3.0.0 |
Java | 1.8 |
运行环境为Linux环境,并已安装Spark, Flink, Kyuubi,推荐Kyuubi Engine以Hadoop Yarn作为执行环境,当然也可以本地启动Spark/Flink Local集群。
可参考Kyuubi安装文档: Deploy Kyuubi engines on Yarn.
Flink SQL访问Lakesoul表
1. 依赖
下载LakeSoul Flink Jar: https://github.com/lakesoul-io/LakeSoul/releases/download/v3.0.0/lakesoul-flink-1.20-3.0.0.jar
将该jar拷贝至 $FLINK_HOME/lib
.
2. Flink配置项
请根据如下文档来设置连接LakeSoul元数据库需要的PG参数: Setup Metadata Database Connection for Flink
在此之后,您可以像往常一样启动 Flink Session集群或Flink Application。
3. 操作LakeSoul
使用Kyuubi beeline来连接Flink SQL Engine:
$KYUUBI_HOME/bin/beeline -u 'jdbc:hive2://localhost:10009/default;user=admin;?kyuubi.engine.type=FLINK_SQL'
使用Flink SQL操作LakeSoul:
create catalog lakesoul with('type'='lakesoul');
use catalog lakesoul;
use `default`;
create table if not exists test_lakesoul_table_v1 (`id` INT, name STRING, score INT,`date` STRING,region STRING, PRIMARY KEY (`id`,`name`) NOT ENFORCED ) PARTITIONED BY (`region`,`date`) WITH ( 'connector'='lakeSoul', 'use_cdc'='true','format'='lakesoul', 'path'='hdfs:///lakesoul-test-bucket/default/test_lakesoul_table_v1/', 'hashBucketNum'='4');
insert into `lakesoul`.`default`.test_lakesoul_table_v1 values (1,'AAA', 100, '2023-05-11', 'China');
insert into `lakesoul`.`default`.test_lakesoul_table_v1 values (2,'BBB', 100, '2023-05-11', 'China');
insert into `lakesoul`.`default`.test_lakesoul_table_v1 values (3,'AAA', 98, '2023-05-10', 'China');
select * from `lakesoul`.`default`.test_lakesoul_table_v1 limit 1;
drop table `lakesoul`.`default`.test_lakesoul_table_v1;
可以将数据存储路径中的 hdfs://
替换为 file://
。
详细的Flink SQL操作Lakesoul内容参阅 : Flink Lakesoul Connector